Quick Start#
This quick guide shows how you can develop a scaleable forecasting system using ForecastFlowML.
Scenario#
Dataset constitutes 3 regions that you want to develop an individual model for.
Each of the store data is small enough to fit into the single machine memory but large enough to cause memory issue for all 10 stores.
What Will Do#
Build independent models for each of the 10 stores.
Parallelize training/inference steps.
Want to use LightGBM as machine learning algorithm.
Develop direct multi-step forecasting using LightGBM.
Perform backtesting.
Import packages#
from forecastflowml.meta_model import ForecastFlowML
from forecastflowml.preprocessing import FeatureExtractor
from forecastflowml.data.loader import load_walmart_m5
from lightgbm import LGBMRegressor
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pandas as pd
import plotly.express as px
import plotly.io as pio
pio.renderers.default = "notebook"
pd.set_option('display.max_columns', 100)
Initialize Spark#
spark = (
SparkSession.builder.master("local[4]")
.config("spark.driver.memory", "8g")
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate()
)
Sample Dataset#
df = load_walmart_m5(spark).localCheckpoint()
df.limit(10).toPandas().head(5)
| id | item_id | dept_id | cat_id | store_id | state_id | sales | date | christmas | |
|---|---|---|---|---|---|---|---|---|---|
| 0 | FOODS_1_013_TX_2_evaluation | FOODS_1_013 | FOODS_1 | FOODS | TX_2 | TX | 2.0 | 2011-01-29 | 0 |
| 1 | FOODS_1_013_TX_2_evaluation | FOODS_1_013 | FOODS_1 | FOODS | TX_2 | TX | 5.0 | 2011-01-30 | 0 |
| 2 | FOODS_1_013_TX_2_evaluation | FOODS_1_013 | FOODS_1 | FOODS | TX_2 | TX | 3.0 | 2011-01-31 | 0 |
| 3 | FOODS_1_013_TX_2_evaluation | FOODS_1_013 | FOODS_1 | FOODS | TX_2 | TX | 0.0 | 2011-02-01 | 0 |
| 4 | FOODS_1_013_TX_2_evaluation | FOODS_1_013 | FOODS_1 | FOODS | TX_2 | TX | 0.0 | 2011-02-02 | 0 |
Feature Engineering#
feature_extractor = FeatureExtractor(
id_col="id",
date_col="date",
target_col="sales",
lag_window_features={
"lag": [7 * (i + 1) for i in range(8)],
"mean": [
[window, lag] for lag in [7, 14, 21, 28] for window in [7, 14, 30]
],
},
date_features=[
"day_of_month",
"day_of_week",
"week_of_year",
"quarter",
"month",
"year",
],
count_consecutive_values={
"value": 0,
"lags": [7, 14, 21, 28],
},
history_length=True,
)
df_features = feature_extractor.transform(df).localCheckpoint()
df_features.limit(10).toPandas().head(5)
| date | id | cat_id | item_id | lag_42 | count_consecutive_value_lag_21 | lag_7 | window_30_lag_14_mean | window_14_lag_14_mean | window_30_lag_21_mean | count_consecutive_value_lag_28 | lag_49 | lag_56 | window_14_lag_28_mean | dept_id | window_30_lag_7_mean | count_consecutive_value_lag_14 | window_7_lag_28_mean | window_30_lag_28_mean | window_7_lag_21_mean | window_14_lag_7_mean | sales | lag_21 | lag_28 | lag_35 | window_7_lag_14_mean | window_14_lag_21_mean | window_7_lag_7_mean | count_consecutive_value_lag_7 | store_id | christmas | state_id | lag_14 | history_length | day_of_month | day_of_week | week_of_year | quarter | month | year | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 2011-01-31 | FOODS_1_011_WI_2_evaluation | FOODS | FOODS_1_011 | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | FOODS_1 | NaN | NaN | NaN | NaN | NaN | NaN | 2.0 | NaN | NaN | NaN | NaN | NaN | NaN | NaN | WI_2 | 0 | WI | NaN | 1 | 31 | 2 | 5 | 1 | 1 | 2011 |
| 1 | 2011-02-01 | FOODS_1_011_WI_2_evaluation | FOODS | FOODS_1_011 | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | FOODS_1 | NaN | NaN | NaN | NaN | NaN | NaN | 0.0 | NaN | NaN | NaN | NaN | NaN | NaN | NaN | WI_2 | 0 | WI | NaN | 2 | 1 | 3 | 5 | 1 | 2 | 2011 |
| 2 | 2011-02-02 | FOODS_1_011_WI_2_evaluation | FOODS | FOODS_1_011 | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | FOODS_1 | NaN | NaN | NaN | NaN | NaN | NaN | 0.0 | NaN | NaN | NaN | NaN | NaN | NaN | NaN | WI_2 | 0 | WI | NaN | 3 | 2 | 4 | 5 | 1 | 2 | 2011 |
| 3 | 2011-02-03 | FOODS_1_011_WI_2_evaluation | FOODS | FOODS_1_011 | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | FOODS_1 | NaN | NaN | NaN | NaN | NaN | NaN | 0.0 | NaN | NaN | NaN | NaN | NaN | NaN | NaN | WI_2 | 0 | WI | NaN | 4 | 3 | 5 | 5 | 1 | 2 | 2011 |
| 4 | 2011-02-04 | FOODS_1_011_WI_2_evaluation | FOODS | FOODS_1_011 | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | FOODS_1 | NaN | NaN | NaN | NaN | NaN | NaN | 0.0 | NaN | NaN | NaN | NaN | NaN | NaN | NaN | WI_2 | 0 | WI | NaN | 5 | 4 | 6 | 5 | 1 | 2 | 2011 |
Split dataset into train and test#
df_train = df_features.filter(F.col("date") <= "2016-05-22")
df_future = df_features.filter(F.col("date") > "2016-05-22")
Training#
model = ForecastFlowML(
# dataset parameters
group_col="state_id",
id_col="id",
date_col="date",
target_col="sales",
date_frequency="days",
# model parameters
model_horizon=7,
max_forecast_horizon=28,
model=LGBMRegressor(),
)
trained_models = model.train(df_train).localCheckpoint()
trained_models.limit(3).toPandas()
C:\spark-3.0.0-bin-hadoop3.2\python\pyspark\sql\pandas\group_ops.py:76: UserWarning:
It is preferred to use 'applyInPandas' over this API. This API will be deprecated in the future releases. See SPARK-28264 for more details.
| group | forecast_horizon | model | start_time | end_time | elapsed_seconds | |
|---|---|---|---|---|---|---|
| 0 | CA | [[1, 2, 3, 4, 5, 6, 7], [8, 9, 10, 11, 12, 13,... | [[128, 3, 99, 108, 105, 103, 104, 116, 103, 98... | 05-Apr-2023 (03:22:30) | 05-Apr-2023 (03:22:42) | 11.6 |
| 1 | TX | [[1, 2, 3, 4, 5, 6, 7], [8, 9, 10, 11, 12, 13,... | [[128, 3, 99, 108, 105, 103, 104, 116, 103, 98... | 05-Apr-2023 (03:22:32) | 05-Apr-2023 (03:22:43) | 11.1 |
| 2 | WI | [[1, 2, 3, 4, 5, 6, 7], [8, 9, 10, 11, 12, 13,... | [[128, 3, 99, 108, 105, 103, 104, 116, 103, 98... | 05-Apr-2023 (03:22:43) | 05-Apr-2023 (03:22:48) | 5.2 |
Backtesting#
cv_forecast = model.cross_validate(df_train, n_cv_splits=3).localCheckpoint()
cv_forecast.limit(5).toPandas()
C:\spark-3.0.0-bin-hadoop3.2\python\pyspark\sql\pandas\group_ops.py:76: UserWarning:
It is preferred to use 'applyInPandas' over this API. This API will be deprecated in the future releases. See SPARK-28264 for more details.
| group | id | date | cv | target | forecast | |
|---|---|---|---|---|---|---|
| 0 | CA | FOODS_1_051_CA_4_evaluation | 2016-04-25 | 0 | 1.0 | 1.138023 |
| 1 | CA | FOODS_1_051_CA_4_evaluation | 2016-04-26 | 0 | 1.0 | 1.083809 |
| 2 | CA | FOODS_1_051_CA_4_evaluation | 2016-04-27 | 0 | 0.0 | 1.046020 |
| 3 | CA | FOODS_1_051_CA_4_evaluation | 2016-04-28 | 0 | 1.0 | 1.153849 |
| 4 | CA | FOODS_1_051_CA_4_evaluation | 2016-04-29 | 0 | 2.0 | 1.164669 |
| 5 | CA | FOODS_1_051_CA_4_evaluation | 2016-04-30 | 0 | 1.0 | 1.419541 |
| 6 | CA | FOODS_1_051_CA_4_evaluation | 2016-05-01 | 0 | 0.0 | 1.511467 |
| 7 | CA | FOODS_1_179_CA_2_evaluation | 2016-04-25 | 0 | 1.0 | 0.426190 |
| 8 | CA | FOODS_1_179_CA_2_evaluation | 2016-04-26 | 0 | 0.0 | 0.419881 |
| 9 | CA | FOODS_1_179_CA_2_evaluation | 2016-04-27 | 0 | 0.0 | 0.376968 |
Plot cross validation forecasts#
cv_state = (
df_train.select("id", "state_id", "date", "sales")
.join(
cv_forecast.select("id", "date", "cv", "forecast"),
on=["id", "date"],
how="left",
)
.groupBy("id", "state_id", "date", "sales")
.pivot("cv")
.sum("forecast")
.groupBy("state_id", "date")
.agg(
F.sum("sales").alias("sales"),
*[F.sum(f"{i}").alias(f"cv_{i}") for i in range(3)],
)
.orderBy("state_id", "date")
).toPandas()
pio.renderers.default = "notebook"
fig = px.line(
cv_state,
x="date",
y=["sales", *[f"cv_{i}" for i in range(3)]],
facet_row="state_id",
facet_row_spacing=0.01,
height=1250,
width=720,
)
fig.update_layout(legend=dict(
orientation="h",
yanchor="top",
y=1.04,
xanchor="center",
x=0.5),
margin=dict(l=0, r=10, t=10, b=10))
fig.update_yaxes(matches=None, title="")
fig.update_xaxes(type="date", range=["2015-01-01", "2016-05-22"])
Inference#
forecast = model.predict(df_future, trained_models)
forecast.limit(5).toPandas()
C:\spark-3.0.0-bin-hadoop3.2\python\pyspark\sql\pandas\group_ops.py:76: UserWarning:
It is preferred to use 'applyInPandas' over this API. This API will be deprecated in the future releases. See SPARK-28264 for more details.
| id | date | prediction | |
|---|---|---|---|
| 0 | FOODS_1_051_CA_4_evaluation | 2016-05-23 | 0.979816 |
| 1 | FOODS_1_051_CA_4_evaluation | 2016-05-24 | 0.890988 |
| 2 | FOODS_1_051_CA_4_evaluation | 2016-05-25 | 0.794059 |
| 3 | FOODS_1_051_CA_4_evaluation | 2016-05-26 | 0.929395 |
| 4 | FOODS_1_051_CA_4_evaluation | 2016-05-27 | 1.031517 |
past_future = (
df.select("id", "state_id", "date", "sales")
.join(forecast, on=["id", "date"], how="left")
.groupBy("state_id", "date")
.agg(
F.sum("sales").alias("sales"),
F.sum("prediction").alias("prediction"),
)
.orderBy("state_id", "date")
.toPandas()
)
pio.renderers.default = "notebook"
fig = px.line(
past_future,
x="date",
y=["sales", "prediction"],
facet_row="state_id",
facet_row_spacing=0.01,
height=1250,
width=720,
)
fig.update_layout(legend=dict(
orientation="h",
yanchor="top",
y=1.04,
xanchor="center",
x=0.5
),
margin=dict(l=0, r=10, t=10, b=10))
fig.update_yaxes(matches=None, title="")
fig.update_xaxes(type="date", range=["2015-01-01", "2016-06-19"])